package com.atguigu.app;

import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink04_LookUpJoin {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //构建维表
        tableEnv.executeSql("" +
                "CREATE TEMPORARY TABLE base_dic ( " +
                "  dic_code STRING, " +
                "  dic_name STRING, " +
                "  parent_code STRING, " +
                "  create_time STRING, " +
                "  operate_time STRING " +
                ") WITH ( " +
                "  'connector' = 'jdbc', " +
                "  'url' = 'jdbc:mysql://hadoop102:3306/gmall-211227-flink', " +
                "  'table-name' = 'base_dic', " +
                "  'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "  'lookup.cache.max-rows' = '10', " +
                "  'lookup.cache.ttl' = '1 hour', " +
                "  'username' = 'root', " +
                "  'password' = '000000' " +
                ")");

        //读取Kafka数据创建主表
        tableEnv.executeSql("" +
                "CREATE TABLE order_info ( " +
                "  `id` STRING, " +
                "  `state_code` STRING, " +
                "  `pt` AS PROCTIME() " +
                ") " + MyKafkaUtil.getKafkaDDL("test", "test_211227"));

        //关联主表&维表并打印
        tableEnv.sqlQuery("" +
                "select " +
                "    o.id, " +
                "    o.state_code, " +
                "    dic.dic_code, " +
                "    dic.dic_name " +
                "from " +
                "    order_info o " +
                "join  " +
                "    base_dic FOR SYSTEM_TIME AS OF o.pt dic " +
                "on o.state_code=dic.dic_code")
                .execute()
                .print();

        //打印维表数据
//        tableEnv.sqlQuery("select dic_code,dic_name,parent_code from base_dic")
//                .execute()
//                .print();

    }

}
